以下适用于版本2.0.x。
public class KafkaProducer<K,V> extends java.lang.Object implements Producer<K,V>
线程安全
为了提高效率,建议实现为单例。
版本>=0.10,客户端可以与broker通信。版本不一致抛出UnsupportedVersionException。
1 | Properties props = new Properties(); |
版本>=0.11,Kafka支持两种Producer:幂等和事务。
幂等
幂等生产者加强了Kafka的传递语义。即使重试,也只传递仅一次。
通过enable.idempotence配置。自动将重试设为Integer.MAX_VALUE,将请求完成标准acks设置为all。无需其他额外配置。
为了实现幂等语义(仅传递一次),需要避免应用级别的重发。建议不要设置重试次数。如果在重试次数无数的情况下,依旧抛出异常,建议关闭生产者,并检查最近生产的消息是否重复。
只在单个会话中保证幂等。
事务
事务允许发送消息给多个分区或主题。
事务API是阻塞的,失败时抛出异常。
transactional.id用于在单一生产者的多个会话中事务恢复。用于分区、有状态的应用间唯一标识生产者。设置后幂等生产者相关的配置也被设置。此外,关联的主题应该设置持久化配置。如replication.factor为3,min.insync.replicas为2。为了实现端到端的事务保证,消费者应该只读取提交的消息。
示例如下,与上述示例类似,但所有的100条消息都在一个事务中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
// 标记已成功的写入为aborted
producer.abortTransaction();
}
producer.close();注意:
同一时刻,只能有一个开放的事务。
beginTransaction()和commitTransaction()间的消息属于同一事务。
transactional.id设置后,发送的消息必须属于某个事务。
事务过程中的异常直接抛出,不必调用Future或回调函数。